基于Locust实现MQTT协议服务的压测脚本

您所在的位置:网站首页 mqtt client app 基于Locust实现MQTT协议服务的压测脚本

基于Locust实现MQTT协议服务的压测脚本

#基于Locust实现MQTT协议服务的压测脚本| 来源: 网络整理| 查看: 265

最近在忙业务的间隙,穿插着做了些性能测试。

一、背景简介

业务背景大概介绍一下,就是按照国标规定,车辆需要上传一些指定的数据到ZF的指定平台,同时车辆也会把数据传到企业云端服务上,于是乎就产生了一些性能需求。

目前我们只是先简单的进行了一个性能场景的测试,就是评估目前服务是否能够支持,预期的最大同时在线车辆上传数据。经过评估,在线车辆数据按照预期的10倍来进行的,并且后面增加持续运行12h查看服务链路的稳定性。

本篇并不是一个严谨的性能测试过程结果分享,主要是分享下关于mqtt协议服务的压测脚本的编写。因为之前我也没接触过MQTT协议的压测,网上关于相关的压测脚本的内容也比较杂乱,所以记录一下,仅供参考。

捋一下链路就知道需要生成哪些数据(因为服务还未上线使用,所以产生的压测数据后面可以直接清理掉即可。):

一些前置数据:比如数据库、缓存里涉及到的车辆数据,通信秘钥数据等等,这些可以之前写脚本一次性生成即可。 车辆上报的数据:车辆上报到云端的数据,是经过一系列加密转码,期间还要设计到解密等,这个经过评估,可以简化其中的某些环境,所以所有的车可以直接发送相同的数据即可。 车辆数据:最后就是生成对应的车辆数据,同时在线,按照评估的频率发送数据。

其中第1、2的数据在之前针对性的分别生成即可,第3步的车辆发送数据就是压测脚本要干的事情了。

二、技术选型

这个倒是很快,搜索引擎大概搜了一下,内容很少,或者说对我有用的内容很少。有看到jmeter有相关插件的,但是这个方案基本上我都是否决的,一来我不擅长用,而来我觉得用起来肯定会比自己编码要麻烦的多。

所以就继续编码好了,仍然首选python,想到了locust库,后来看官方文档的时候,看到locust也针对mqtt协议拓展了一些内容。但是我尝试下来不太符合我这的需求,也可能当时我用的不对吧,所以就只能自己来从零开始编写了。

搜索中又发现Python中用于mqtt协议的库叫paho.mqtt,支持连接代理,消息的订阅、收发等等,于是最后确定使用:locust+paho.mqtt的组合来实现本次的负载脚本。

三、代码编写 1. 脚本代码

暂时没做代码分层,目前场景简单,就直接都放一个模块里了,有点长,先贴上来,后面部分会对脚本的重点内容进行拆解。

脚本目前做了这些事情:

从db中查询有效可用的所有测试车辆信息数据 根据命令行的输入参数,指定启动的车辆数,以及与broker代理建立连接的频率 建立连接成功的车辆,就可以根据脚本里指定的频次,来像broker发送数据 脚本统计连接数、请求数、响应时间等信息写到报表中 调试遇到车辆会批量断开连接的情况,增加了当车辆断开连接时,把断开时间、车辆信息写到本地csv中,方便第二天来查看分析。 import csv import datetime import queue import os import sys import time import ssl from paho.mqtt import client as mqtt_client # 根据不同系统进行路径适配 if os.name == "nt": path = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) sys.path.insert(0, path) from GB_test.utils.mysql_operating import DB elif os.name == "posix": sys.path.append("/app/qa_test_app/") from GB_test.utils.mysql_operating import DB from locust import User, TaskSet, events, task, between, run_single_user BROKER_ADDRESS = "broker服务地址" PORT = 1111 PASSWORD = "111111" PUBLISH_TIMEOUT = 10000 # 超时时间 TEST_TOPIC = "test_topic" TEST_VALUE = [16, 3, -26, 4, 0, 36,.......] # 用来publish的测试数据,仅示意 BYTES_DATA = bytes(i % 256 for i in TEST_VALUE) # 业务需要转换成 byte 类型后再发送 # 创建队列 client_queue = queue.Queue() # 连接DB,读取车辆数据 db = DB("db_vmd") select_sql = "select xxxx" client_list = db.fetch_all(select_sql) print("车辆数据查询完毕,数据量:{}".format(len(client_list))) for t in client_list: # 把可用的车辆信息存到队列中去 client_queue.put(t) def fire_success(**kwargs): """请求成功时调用""" events.request.fire(**kwargs) def calculate_resp_time(t1, t2): """计算响应时间""" return int((t2 - t1) * 1000) class MQTTMessage: """已发送的消息实体类""" def __init__(self, _type, qos, topic, payload, start_time, timeout): self.type = _type, self.qos = qos, self.topic = topic self.payload = payload self.start_time = start_time self.timeout = timeout # 统计总共发送成功的消息数量 total_published = 0 disconnect_record_list = [] # 定义存放连接断开的记录的列表容器 class PublishTask(TaskSet): @task def task_publish(self): self.client.loop_start() topic = TEST_TOPIC payload = BYTES_DATA # 记录发送的开始时间 start_time = time.time() mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False) published_mid = mqtt_msg_info.mid # 将发送成功的消息内容,放入client实例的 published_message 字段 self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE, 0, topic, payload, start_time, PUBLISH_TIMEOUT) # 发送成功回调 self.client.on_publish = self.on_publish # 断开连接回调 self.client.on_disconnect = self.on_disconnect @staticmethod def on_disconnect(client, userdata, rc): """ broker连接断开,放入列表容器""" disconnected_info = [str(client._client_id), rc, datetime.datetime.now()] disconnect_record_list.append(disconnected_info) print("rc状态:{} - -".format(rc), "{}-broker连接已断开".format(str(client._client_id))) @staticmethod def on_publish(client, userdata, mid): if mid: # 记录消息发送成功的时间 end_time = time.time() # 从已发送的消息容器中,取出消息 message = client.published_message.pop(mid, None) # 计算开始发送到发送成功的耗时 publish_resp_time = calculate_resp_time(message.start_time, end_time) fire_success( request_type="p_success", name="client_id: " + str(client._client_id), response_time=publish_resp_time, response_length=len(message.payload), exception=None, context=None ) global total_published # 成功发送累加1 total_published += 1 class MQTTLocustUser(User): tasks = [PublishTask] wait_time = between(2, 2) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # 从队列中获取客户端 username 和 client_id current_client = client_queue.get() self.client = mqtt_client.Client(current_client[1]) self.client.username_pw_set(current_client[0], PASSWORD) # self.client.username_pw_set(current_client[0] + "1", PASSWORD) # 模拟client连接报错 # 定义一个容器,存放已发送的消息 self.client.published_message = {} def on_start(self): # 设置tls context = ssl.SSLContext(ssl.PROTOCOL_TLS) self.client.tls_set_context(context) self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60) self.client.on_connect = self.on_connect def on_stop(self): print("publish 成功, 当前已成功发送数量:{}".format(total_published)) if len(disconnect_record_list) == 0: print("无断开连接的client") else: # 把断开记录里的信息写入csv with open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile: writer = csv.writer(csvfile) writer.writerow(['client_id', 'rc_status', 'disconnected_time']) for i in disconnect_record_list: writer.writerow(i) print("断开连接的client信息已写入csv文件") @staticmethod def on_connect(client, userdata, flags, rc, props=None): if rc == 0: print("rc状态:{} - -".format(rc), "{}-连接broker成功".format(str(client._client_id))) fire_success( request_type="c_success", name='count_connected', response_time=0, response_length=0, exception=None, context=None ) else: print("rc状态:{} - -".format(rc), "{}-连接broker失败".format(str(client._client_id))) fire_success( request_type="c_fail", name="client_id: " + str(client._client_id), response_time=0, response_length=0, exception=None, context=None ) if __name__ == '__main__': run_single_user(MQTTLocustUser) 2. 代码分析-locust库部分

并发请求能力还是使用的locust库的能力。官方只提供了http协议接口的相关类,没直接提供mqtt协议的,但是我们可以按照官方的规范,自定义相关的类,只要继承User和TaskSet即可。

User类

首先是先定义User类,这里就是用来生成我要用来测试的车辆。

类初始化的时候,黄色框里,会去队列里取出车辆信息,用来做一些相关的设置。client来源于from paho.mqtt import client as mqtt_client提供的能力,固定用法,按照人家的文档使用就行。

红色框里,是User类的2个重要熟悉属性:

tasks: 这里定义了生成的用户需要去干哪些事情,也就是对应脚本里的PublishTask类下面定义的内容。 wait_time: 用户在执行task时间隔停留的时间,可以是个区间,在里面随机。我这里意思是每2s发送一次数据到broker。

绿色框里,定义了一个字典容器,用来存放当前用户已发送成功的消息内容,因为后面我要取出来把里面相关的数据写到生成的报表中去。

蓝色框里有2个方法,也是locust提供的能力:

on_start:当用户开始运行时调用,这里我做了车辆连接broker代理的处理,注意这里需要设置tls,因为服务连接需要。

on_stop:当用户结束运行时调用,这里我做了一些其他的处理,比如把运行期间断开连接的车辆信息写到本地csv中。

TaskSet类

定义好User类,就需要来定义TaskSet类,你得告诉产生出来的用户,要干点啥。

我这根据业务需要,就是让车辆不停的像broker发送数据即可。

红色部分,同样是paho.mqtt提供的能力,会启动新的线程去执行你定义的事情。

黄色部分,就是做发送数据的操作,并且我可以拿到一些返回,查看源码就可以知道返回的是MQTTMessageInfo类。

注意返回的2个属性:

mid: 返回这个消息发送的顺序 rc: 表示发送的响应状态,0 就是成功

绿色部分,还记得我在上面的User类中定义了一个容器,在这里就把发送的消息相关信息放到容器中去,留着后面使用。

2. 代码分析-paho.mqtt库部分

上面的代码已经用到了不少paho.mqtt的能力,这里再进行整体梳理下。

client.Client():声明一个client client.username_pw_set(): 设置客户端的用户名,密码 client.tls_set_context: 设置ssl模式 client.connect(): 连接代理 client.publish:向代理推送消息

还用到了一些回调函数:

on_connect:连接操作成功时回调 on_publish:发布成功时回调 on_disconnect:客户端与代理断开连接时回调

另外还用到了一个事件函数events.request。

当客户端发送请求时会调用,不管是请求成功还是请求失败;当我需要自定义我的报告内容时,就需要用到这个event。

查看源码,知道里面要传哪些参数,那我们在调用时候就需要传入对应的参数。

比如我在发送回调函数里调用了该方法。

所以最后在控制台显示的报告里就有我定义的内容了。

由于后来在使用中发现,不知道会在什么时候出现批量断开的情况,于是在on_disconnect回调函数里增加了对应处理,把相关的断开信息记录下来,运行结束的时候写到本地文件里去。

后来我主动尝试客户端断开的情况测试了下文件的写入结果,功能正常。

三、小结

后面就开始运行了,在运行过程中,开发关注链路服务的各项指标,这里就不展开了,业务缠身就并没有过多的去做这个事情,况且也不专业。确实也发现了不少问题,后面逐步优化,再继续测试。

现在稳定运行12h,服务正常,暂时就先告一段落了。后面还有会相关其他性能测试场景,届时就可以针对性的展开分享下了。

另外,这个脚本分享也只是仅供参考,现在我这是使用简单,本着能用就行,可能存在一些不合理需要优化的地方,有需要的朋友还请自行查阅相关文档。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3